package com.cloudansys.core.flink.sink;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

import java.util.List;

@Slf4j
public class YBToRedisSink extends RichSinkFunction<List<MultiDataEntity>> {

    private Jedis jedis;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
    }

    /**
     * 把应变参考点，通讯编号为31、32的传感器数据缓存到 redis 中，实时更新
     */
    @Override
    public void invoke(List<MultiDataEntity> element, Context context) throws Exception {
        for (MultiDataEntity multiDataEntity : element) {
            String serialCode = multiDataEntity.getSerialCode();
            Double value = multiDataEntity.getValues()[1];
            if (Const.SCS_31.equals(serialCode)) {
                jedis.set(Const.SCS_31, String.valueOf(value));
            }
            if (Const.SCS_32.equals(serialCode)) {
                jedis.set(Const.SCS_32, String.valueOf(value));
            }
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
    }
}
